package com.xj.zookeeper.framework;

import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.net.SocketException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * Author: xiajun
 * Date: 2014-08-07
 * Time: 15:30:00
 */
public class ZkClient {
    private final Logger log = Logger.getLogger(ZkClient.class);
    private final Semaphore connectionLock = new Semaphore(0);
    private int sessionTimeout;
    private int connectionTimeout = 3000;
    private boolean asnycConnection;
    protected boolean isAuth = false;
    protected ZooKeeper zooKeeper;
    private ZkWatcher watcher;
    private String auth;
    private String hosts;
    private String user;

    /**
     * 创建zookeeper客户端
     *
     * @param sessionTimeout    回话超时时间
     * @param asnycConnection   是否为异步创建
     * @param connectionTimeout 连接zookeeper 超时时间
     */
    public ZkClient(int sessionTimeout, boolean asnycConnection, int connectionTimeout) {
        this.sessionTimeout = sessionTimeout;
        this.asnycConnection = asnycConnection;
        this.connectionTimeout = connectionTimeout;
        watcher = new ZkWatcher(connectionLock, asnycConnection);
    }


    public ZkClient() {
        this(3000, false, 3000);
    }

    public ZkClient(boolean asnycConnection) {
        this(3000, asnycConnection, 3000);
    }

    /**
     * 创建节点
     *
     * @param path 路径
     * @param data 数据
     * @param mode 节点类型 CreateMode.PERSISTENT 永久节点，CreateMode.EPHEMERAL临时节点
     * @throws ZkException
     */
    public String create(String path, byte[] data, CreateMode mode) throws ZkException {
        this.checkStatus();
        String createNode;
        try {
            if (isAuth) {
                createNode = this.zooKeeper.create(path, data, ZooDefs.Ids.CREATOR_ALL_ACL, mode);
            } else {
                createNode = this.zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
            }
        } catch (Exception e) {
            throw new ZkException("create node " + path + ", mode=" + mode.name(), e);
        }
        return createNode;
    }

    /**
     * 创建一个临时节点
     *
     * @param path
     * @param data
     * @param timeoutReCreate true :session超时重连后自动创建该节点,false:创建普通的临时节点
     */
    public void create(String path, byte[] data, boolean timeoutReCreate) throws ZkException {
        this.checkStatus();
        create(path, data, CreateMode.EPHEMERAL);
        if (timeoutReCreate) {
            WatcherProcess process = watcher.getProcess();
            Node node = new Node();
            node.setData(data);
            node.setPath(path);
            process.setStabilizeNode(node);
        }
    }

    /**
     * 创建一个永久节点
     *
     * @param path 路径
     * @param data 数据
     * @throws ZkException
     */
    public void create(String path, byte[] data) throws ZkException {
        create(path, data, CreateMode.PERSISTENT);
    }

    /**
     * 支持多层节点创建
     *
     * @param path
     * @param mode
     * @throws ZkException
     */
    public String create(String path, CreateMode mode) throws ZkException {
        if (path != null && !path.trim().equals("")) {
            if (exists(path)) {
                throw new ZkException("Node path: " + path + " already exists.");
            }
            String[] paths = path.trim().split("/");
            String p = "";
            for (String s : paths) {
                if (s != null && !s.equals("")) {
                    p += "/" + s;
                    if (!exists(p)) {
                        create(p, new byte[1], mode);
                    }
                }
            }
        }
        return path;
    }

    /**
     * 删除节点
     *
     * @param path 路径
     * @throws ZkException
     */
    public void delete(String path) throws ZkException {
        this.checkStatus();
        try {
            this.zooKeeper.delete(path, -1);
        } catch (Exception e) {
            throw new ZkException("delete node " + path, e);
        }
    }

    /**
     * 获取client信息
     *
     * @param path
     * @throws ZkException
     */
    public List<String> getChild(String path, boolean watcher) throws ZkException {
        this.checkStatus();
        try {
            return this.zooKeeper.getChildren(path, watcher);
        } catch (Exception e) {
            throw new ZkException("getChildren node " + path, e);
        }
    }

    /**
     * 获取数据
     *
     * @param path
     * @return
     * @throws ZkException
     */
    public byte[] getData(String path, boolean watcher) throws ZkException {
        this.checkStatus();
        try {
            return this.zooKeeper.getData(path, watcher, null);
        } catch (Exception e) {
            throw new ZkException("getData node " + path, e);
        }
    }

    /**
     * 插入数据
     *
     * @param path
     * @param data
     * @throws ZkException
     */
    public void setData(String path, byte[] data) throws ZkException {
        this.checkStatus();
        try {
            this.zooKeeper.setData(path, data, -1);
        } catch (Exception e) {
            throw new ZkException("setData node " + path, e);
        }
    }

    /**
     * 判断路径节点是否存在
     *
     * @param path
     * @return
     * @throws ZkException
     */
    public boolean exists(String path) throws ZkException {
        this.checkStatus();
        try {
            //this.zooKeeper.exists(path, false);
            return this.zooKeeper.exists(path, false) != null;
        } catch (Exception e) {
            throw new ZkException("exists node " + path, e);
        }
    }

    /**
     * 连接zookeeper
     *
     * @param hosts 服务器地址10.12.147.196:2181,10.12.147.196:2181
     * @param user  用户
     * @param auth  权限
     * @throws ZkException
     */
    public synchronized void connect(String hosts, String user, String auth) throws ZkException {
        if (this.checkConnection()) {
            throw new ZkException("Has been connected to the server, please do not repeat connection. host:" + hosts);
        }
        this.hosts = hosts;
        this.user = user;
        this.auth = auth;
        try {
            zooKeeper = new ZooKeeper(hosts, sessionTimeout, watcher);
        } catch (IOException e) {
            throw new ZkException("Connect zookeeper hosts=" + hosts, e);
        }
        if (user != null && auth != null) {
            zooKeeper.addAuthInfo(user, auth.getBytes(Charset.forName("utf-8")));
            isAuth = true;
        }
        if (!asnycConnection) {
            try {
                boolean isConn = connectionLock.tryAcquire(connectionTimeout, TimeUnit.MILLISECONDS);
                if (!isConn) {
                    zooKeeper.close();
                    log.error("zookeeper connection timeout. host:" + hosts);
                }
            } catch (InterruptedException e) {
                throw new ZkException("connectionLock", e);
            }
        }
        watcher.setWatcherProcess(new WatcherProcess(this), this);
    }

    /**
     * 连接zookeeper
     *
     * @param hosts 服务器地址10.12.147.196:2181,10.12.147.196:2181
     * @throws ZkException
     */
    public void connect(String hosts) throws ZkException {
        connect(hosts, null, null);
    }

    /**
     * 重新连接到zookeeper服务
     *
     * @throws ZkException
     */
    public void reconnect() throws ZkException {
        connect(hosts, user, auth);
        log.info("reconnection to zookeeper.");
    }

    /**
     * 关闭zookeeper连接
     *
     * @throws ZkException
     */
    public void close() throws InterruptedException, ZkException {
        this.checkStatus();
        zooKeeper.close();
    }

    /**
     * 对节点进行监听，此方法只监听子节点的变化
     *
     * @param listen
     * @throws ZkException
     * @param instantly    是否立即触发回调(第一次在监听时就触发不管是否有子接点变化)
     */
    public void listenNode(Listener listen, boolean instantly) throws ZkException {
        this.checkStatus();
        WatcherProcess pro = watcher.getProcess();
        if (pro != null) {
            pro.listen(listen, true,instantly);
        } else {
            log.warn("not found WatcherProcess instance,Listening can't be triggered.");
        }
    }

    /**
     * 对节点解除监听
     *
     * @param nodePath
     * @throws ZkException
     */
    public void unlistenNode(String nodePath) throws ZkException {
        this.checkStatus();
        WatcherProcess pro = watcher.getProcess();
        if (pro != null) {
            pro.unlisten(nodePath, true);
        } else {
            log.warn("not found WatcherProcess instance,Listening can't be triggered.");
        }
    }

    /**
     * 对节点数据进行监听
     *
     * @param listen
     * @param instantly    是否立即触发回调(第一次在监听时就触发不管是否有数据变化)
     * @throws ZkException
     */
    public void listenData(Listener listen,boolean instantly) throws ZkException {
        this.checkStatus();
        WatcherProcess pro = watcher.getProcess();
        if (pro != null) {
            pro.listen(listen, false ,instantly);
        } else {
            log.warn("not found WatcherProcess instance,Listening can't be triggered.");
        }
    }

    public void unlistenData(String nodePath) throws ZkException {
        this.checkStatus();
        WatcherProcess pro = watcher.getProcess();
        if (pro != null) {
            pro.unlisten(nodePath, false);
        } else {
            log.warn("not found WatcherProcess instance,Listening can't be triggered.");
        }
    }

    private boolean checkStatus() throws ZkException {
        if (zooKeeper == null) {
            throw new ZkException("Not connected to the zookeeper server,host=" + hosts + ",invoking this.connect().");
        }
        if (zooKeeper.getState() != ZooKeeper.States.CONNECTED) {
            throw new ZkException("Not connected to the zookeeper server,host=" + hosts + ",state: " + zooKeeper.getState());
        }
        return true;
    }

    private boolean checkConnection() {
        boolean conn = false;
        if (zooKeeper != null) {
            conn = zooKeeper.getState().isConnected();
        }
        return conn;
    }

    public static void main(String[] args) throws Exception {
        ZkClient client = new ZkClient();
        client.connect("10.12.117.196:2181");
        List<String> list = client.getChild("/config/resource/venus/common", false);
        client.listenData(new Listener("/config/resource/venus/common") {
            @Override
            public void listen(String change, Watcher.Event.EventType eventType, byte[] data) throws ZkException, SocketException {
                System.out.println(Thread.currentThread().getName() + "  " + eventType + "====" + new String(data));
            }
        },true);
        Thread.sleep(100000);
    }
}
